diff --git a/swh/web/ui/apidoc.py b/swh/web/ui/apidoc.py index 18e2dd28b..13342a5f4 100644 --- a/swh/web/ui/apidoc.py +++ b/swh/web/ui/apidoc.py @@ -1,418 +1,440 @@ # Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import re from functools import wraps from enum import Enum from flask import request, render_template, url_for from flask import g from swh.web.ui.main import app class argtypes(Enum): # noqa: N801 """Class for centralizing argument type descriptions """ ts = 'timestamp' int = 'integer' str = 'string' path = 'path' sha1 = 'sha1' uuid = 'uuid' sha1_git = 'sha1_git' algo_and_hash = 'algo_hash:hash' class rettypes(Enum): # noqa: N801 """Class for centralizing return type descriptions """ octet_stream = 'octet stream' list = 'list' dict = 'dict' class excs(Enum): # noqa: N801 """Class for centralizing exception type descriptions """ badinput = 'BadInputExc' notfound = 'NotFoundExc' class APIUrls(object): """ Class to manage API documentation URLs. * Indexes all routes documented using apidoc's decorators. * Tracks endpoint/request processing method relationships for use in generating related urls in API documentation Relies on the load_controllers logic in main.py for initialization. """ apidoc_routes = {} method_endpoints = {} @classmethod def get_app_endpoints(cls): return cls.apidoc_routes @classmethod def get_method_endpoints(cls, fname): if len(cls.method_endpoints) == 0: cls.method_endpoints = cls.group_routes_by_method() return cls.method_endpoints[fname] @classmethod def group_routes_by_method(cls): """ Group URL endpoints according to their processing method. Returns: A dict where keys are the processing method names, and values are the routes that are bound to the key method. """ endpoints = {} for rule in app.url_map.iter_rules(): rule_dict = {'rule': rule.rule, 'methods': rule.methods} if rule.endpoint not in endpoints: endpoints[rule.endpoint] = [rule_dict] else: endpoints[rule.endpoint].append(rule_dict) return endpoints @classmethod def index_add_route(cls, route, docstring, **kwargs): """ Add a route to the self-documenting API reference """ if route not in cls.apidoc_routes: d = {'docstring': docstring} for k, v in kwargs.items(): d[k] = v cls.apidoc_routes[route] = d class APIDocException(Exception): """ Custom exception to signal errors in the use of the APIDoc decorators """ class APIDocBase(object): """ The API documentation decorator base class, responsible for the operations that link the decorator stack together: * manages the _inner_dec property, which represents the decorator directly below self in the decorator tower * contains the logic used to return appropriately if self is the last decorator to be applied to the API function """ def __init__(self): self._inner_dec = None @property def inner_dec(self): return self._inner_dec @inner_dec.setter def inner_dec(self, instance): self._inner_dec = instance @property def data(self): raise NotImplementedError def process_rv(self, f, args, kwargs): """ From the arguments f has, determine whether or not it is the last decorator in the stack, and return the appropriate call to f. """ rv = None if 'outer_decorator' in f.__code__.co_varnames: rv = f(*args, **kwargs) else: nargs = {k: v for k, v in kwargs.items() if k != 'outer_decorator'} try: rv = f(*args, **nargs) except (TypeError, KeyError): # documentation call rv = None return rv def maintain_stack(self, f, args, kwargs): """ From the arguments f is called with, determine whether or not the stack link was made by @apidoc.route, and maintain the linking for the next call to f. """ if 'outer_decorator' not in kwargs: raise APIDocException('Apidoc %s: expected an apidoc' ' route decorator first' % self.__class__.__name__) kwargs['outer_decorator'].inner_dec = self kwargs['outer_decorator'] = self return self.process_rv(f, args, kwargs) class route(APIDocBase): # noqa: N801 """Decorate an API method to register it in the API doc route index and create the corresponding Flask route. This decorator is responsible for bootstrapping the linking of subsequent decorators, as well as traversing the decorator stack to obtain the documentation data from it. Args: route: documentation page's route noargs: set to True if the route has no arguments, and its result should be displayed anytime its documentation is requested. Default to False hidden: set to True to remove the endpoint from being listed in the /api endpoints. Default to False. tags: Further information on api endpoints. Two values are possibly expected: - hidden: remove the entry points from the listing - upcoming: display the entry point but it is not followable """ def __init__(self, route, noargs=False, tags=[]): super().__init__() self.route = route self.noargs = noargs self.tags = set(tags) def __call__(self, f): options = {} if 'hidden' not in self.tags: if self.tags: options['tags'] = self.tags APIUrls.index_add_route(self.route, f.__doc__, **options) @wraps(f) def doc_func(*args, **kwargs): kwargs['outer_decorator'] = self rv = self.process_rv(f, args, kwargs) return self.compute_return(f, rv) if not self.noargs: app.add_url_rule(self.route, f.__name__, doc_func) return doc_func def filter_api_url(self, endpoint, route_re, noargs): doc_methods = {'GET', 'HEAD', 'OPTIONS'} if re.match(route_re, endpoint['rule']): if endpoint['methods'] == doc_methods and not noargs: return False return True def build_examples(self, f, urls, args): """Build example documentation. Args: f: function urls: information relative to url for that function args: information relative to arguments for that function Yields: example based on default parameter value if any """ s = set() r = [] for data_url in urls: url = data_url['rule'] defaults = {arg['name']: arg['default'] for arg in args if arg['name'] in url} if defaults: url = url_for(f.__name__, **defaults) if url in s: continue s.add(url) r.append(url) return r def compute_return(self, f, rv): """Build documentation""" data = self.data if not f.__doc__: raise APIDocException('Apidoc %s: expected a docstring' ' for function %s' % (self.__class__.__name__, f.__name__)) data['docstring'] = f.__doc__ route_re = re.compile('.*%s$' % data['route']) endpoint_list = APIUrls.get_method_endpoints(f.__name__) data['urls'] = [url for url in endpoint_list if self.filter_api_url(url, route_re, data['noargs'])] if 'args' in data: data['examples'] = self.build_examples( f, data['urls'], data['args']) # Prepare and send to mimetype selector if it's not a doc request if re.match(route_re, request.url) and not data['noargs'] \ and request.method == 'GET': return app.response_class( render_template('apidoc.html', **data), content_type='text/html') g.doc_env = data # Store for response processing return rv @property def data(self): data = {'route': self.route, 'noargs': self.noargs} doc_instance = self.inner_dec while doc_instance: if isinstance(doc_instance, arg): if 'args' not in data: data['args'] = [] data['args'].append(doc_instance.data) elif isinstance(doc_instance, raises): if 'excs' not in data: data['excs'] = [] data['excs'].append(doc_instance.data) elif isinstance(doc_instance, returns): data['return'] = doc_instance.data elif isinstance(doc_instance, header): if 'headers' not in data: data['headers'] = [] data['headers'].append(doc_instance.data) + elif isinstance(doc_instance, param): + if 'params' not in data: + data['params'] = [] + + data['params'].append(doc_instance.data) else: raise APIDocException('Unknown API documentation decorator') doc_instance = doc_instance.inner_dec return data -class arg(APIDocBase): +class BaseDescribeDocBase(APIDocBase): + """Base description of optional input/output setup for a route. + + """ + def __init__(self): + self.doc_dict = None + self.inner_dec = None + + def __call__(self, f): + @wraps(f) + def arg_fun(*args, outer_decorator=None, **kwargs): + kwargs['outer_decorator'] = outer_decorator + return self.maintain_stack(f, args, kwargs) + return arg_fun + + @property + def data(self): + return self.doc_dict + + +class arg(BaseDescribeDocBase): """ Decorate an API method to display an argument's information on the doc page specified by @route above. Args: name: the argument's name. MUST match the method argument's name to create the example request URL. default: the argument's default value argtype: the argument's type as an Enum value from apidoc.argtypes argdoc: the argument's documentation string """ def __init__(self, name, default, argtype, argdoc): super().__init__() self.doc_dict = { 'name': name, 'type': argtype.value, 'doc': argdoc, 'default': default } - self.inner_dec = None - - def __call__(self, f): - @wraps(f) - def arg_fun(*args, outer_decorator=None, **kwargs): - kwargs['outer_decorator'] = outer_decorator - return self.maintain_stack(f, args, kwargs) - return arg_fun - @property - def data(self): - return self.doc_dict - -class header(APIDocBase): +class header(BaseDescribeDocBase): """ Decorate an API method to display header information the api can potentially return in the response. Args: name: the header name doc: the information about that header """ def __init__(self, name, doc): + super().__init__() self.doc_dict = { 'name': name, 'doc': doc, } - def __call__(self, f): - @wraps(f) - def arg_fun(*args, outer_decorator=None, **kwargs): - kwargs['outer_decorator'] = outer_decorator - return self.maintain_stack(f, args, kwargs) - return arg_fun - @property - def data(self): - return self.doc_dict +class param(BaseDescribeDocBase): + """Decorate an API method to display query parameter information the + api can potentially accept. + + Args: + name: the parameter name + default: default value + doc: the information about that header + + """ + def __init__(self, name, default, doc): + super().__init__() + self.doc_dict = { + 'name': name, + 'default': default, + 'doc': doc, + } class raises(APIDocBase): # noqa: N801 """ Decorate an API method to display information pertaining to an exception that can be raised by this method. Args: exc: the exception name as an Enum value from apidoc.excs doc: the exception's documentation string """ def __init__(self, exc, doc): super().__init__() self.exc_dict = { 'exc': exc.value, 'doc': doc } def __call__(self, f): @wraps(f) def exc_fun(*args, outer_decorator=None, **kwargs): kwargs['outer_decorator'] = outer_decorator return self.maintain_stack(f, args, kwargs) return exc_fun @property def data(self): return self.exc_dict class returns(APIDocBase): # noqa: N801 """ Decorate an API method to display information about its return value. Args: rettype: the return value's type as an Enum value from apidoc.rettypes retdoc: the return value's documentation string """ def __init__(self, rettype=None, retdoc=None): super().__init__() self.return_dict = { 'type': rettype.value, 'doc': retdoc } def __call__(self, f): @wraps(f) def ret_fun(*args, outer_decorator=None, **kwargs): kwargs['outer_decorator'] = outer_decorator return self.maintain_stack(f, args, kwargs) return ret_fun @property def data(self): return self.return_dict diff --git a/swh/web/ui/templates/apidoc.html b/swh/web/ui/templates/apidoc.html index 9e89a78dc..c2cb1c214 100644 --- a/swh/web/ui/templates/apidoc.html +++ b/swh/web/ui/templates/apidoc.html @@ -1,108 +1,119 @@ {% extends "layout.html" %} {% block title %}Software Heritage API{% endblock %} {% block content %} {% if docstring %}

Overview

{% autoescape off %} {{ docstring | safe_docstring_display }} {% endautoescape %}
{% endif %} {% if response_data and response_data is not none %}

Request

{{ request.method }} {{ request.url }}

Result

{% autoescape off %}{{ response_data | urlize_api_links }}{% endautoescape %}
{% if headers_data and headers_data is not none %}

Headers

{% for header_name, header_value in headers_data.items() %}
{{ header_name }}
{{ header_value | urlize_header_links | safe }}
{% endfor %}
{% endif %}
{% endif %}
{% for url in urls %} {% endfor %}
URL Allowed Methods
{{ url['rule'] }} {{ url['methods'] | sort | join(', ') }}

{% if args and args|length > 0 %}

Args

{% for arg in args %}
{{ arg['name'] }}: {{ arg['type'] }}
{% autoescape off %} {{ arg['doc'] | safe_docstring_display }} {% endautoescape %}
{% endfor %}
{% endif %} +{% if params and params|length > 0 %} +
+

Params

+
+ {% for param in params %} +
{{ param['name'] }}: string
+
{% autoescape off %} {{ param['doc'] | safe_docstring_display }} {% endautoescape %}
+ {% endfor %} +
+
+{% endif %} {% if excs and excs|length > 0 %}

Raises

{% for exc in excs %}
{{ exc['exc'] }}
{% autoescape off %} {{ exc['doc'] | safe_docstring_display }} {% endautoescape %}
{% endfor %}
{% endif %} {% if headers %}

Headers

{% for header in headers %}
{{ header['name'] }}: string
{% autoescape off %} {{ header['doc'] | safe_docstring_display }} {% endautoescape %}
{% endfor %}
{% endif %} {% if return %}

Returns

{{ return['type'] }}
{% autoescape off %} {{ return['doc'] | safe_docstring_display }} {% endautoescape %}
{% endif %} {% if examples %}

Examples

{% for example in examples %}
{{ example }}
{% endfor %}
{% endif %} {% endblock %} diff --git a/swh/web/ui/tests/test_apidoc.py b/swh/web/ui/tests/test_apidoc.py index 77930edc6..6f0237f87 100644 --- a/swh/web/ui/tests/test_apidoc.py +++ b/swh/web/ui/tests/test_apidoc.py @@ -1,125 +1,127 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from nose.tools import istest, nottest from swh.web.ui import apidoc from swh.web.ui.main import app from swh.web.ui.tests.test_app import SWHApidocTestCase class APIDocTestCase(SWHApidocTestCase): def setUp(self): self.arg_dict = { 'name': 'my_pretty_arg', 'default': 'some default value', 'type': apidoc.argtypes.sha1, 'doc': 'this arg does things' } self.stub_excs = [{'exc': apidoc.excs.badinput, 'doc': 'My exception documentation'}] self.stub_args = [{'name': 'stub_arg', 'default': 'some_default'}] self.stub_rule_list = [ {'rule': 'some/route/with/args/', 'methods': {'GET', 'HEAD', 'OPTIONS'}}, {'rule': 'some/doc/route/', 'methods': {'GET', 'HEAD', 'OPTIONS'}}, {'rule': 'some/other/route/', 'methods': {'GET', 'HEAD', 'OPTIONS'}} ] self.stub_return = { 'type': apidoc.rettypes.dict.value, 'doc': 'a dict with amazing properties' } @apidoc.route('/my/nodoc/url/') @nottest def apidoc_nodoc_tester(arga, argb): return arga + argb @istest def apidoc_nodoc_failure(self): with self.assertRaises(Exception): self.client.get('/my/nodoc/url/') @istest def apidoc_badorder_failure(self): with self.assertRaises(AssertionError): @app.route('/my/badorder/url//') @apidoc.arg('foo', default=True, argtype=apidoc.argtypes.int, argdoc='It\'s so fluffy!') @apidoc.route('/my/badorder/url/') @nottest def apidoc_badorder_tester(foo, bar=0): """ Some irrelevant doc since the decorators are bad """ return foo + bar @app.route('/some///') @apidoc.route('/some/doc/route/') @nottest def apidoc_route_tester(myarg, myotherarg, akw=0): """ Sample doc """ return {'result': myarg + myotherarg + akw} @istest def apidoc_route_doc(self): # when rv = self.client.get('/some/doc/route/') # then self.assertEqual(rv.status_code, 200) self.assert_template_used('apidoc.html') @istest def apidoc_route_fn(self): # when rv = self.client.get('/some/1/1/') # then self.assertEqual(rv.status_code, 200) @app.route('/some/full///') @apidoc.route('/some/complete/doc/route/') @apidoc.arg('myarg', default=67, argtype=apidoc.argtypes.int, argdoc='my arg') + @apidoc.param('limit', default=10, doc='Result limitation') + @apidoc.header('Link', doc='Header link returns for pagination purpose') @apidoc.raises(exc=apidoc.excs.badinput, doc='Oops') @apidoc.returns(rettype=apidoc.rettypes.dict, retdoc='sum of args') @nottest def apidoc_full_stack_tester(myarg, myotherarg, akw=0): """ Sample doc """ return {'result': myarg + myotherarg + akw} @istest def apidoc_full_stack_doc(self): # when rv = self.client.get('/some/complete/doc/route/') # then self.assertEqual(rv.status_code, 200) self.assert_template_used('apidoc.html') @istest def apidoc_full_stack_fn(self): # when rv = self.client.get('/some/full/1/1/') # then self.assertEqual(rv.status_code, 200) diff --git a/swh/web/ui/views/api.py b/swh/web/ui/views/api.py index 6ececc102..b1dc2803a 100644 --- a/swh/web/ui/views/api.py +++ b/swh/web/ui/views/api.py @@ -1,1025 +1,1031 @@ # Copyright (C) 2015-2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from types import GeneratorType from flask import request, url_for from swh.web.ui import service, utils, apidoc as doc from swh.web.ui.exc import NotFoundExc from swh.web.ui.main import app @app.route('/api/1/stat/counters/') @doc.route('/api/1/stat/counters/', noargs=True) @doc.returns(rettype=doc.rettypes.dict, retdoc="A dictionary of SWH's most important statistics") def api_stats(): """Return statistics on SWH storage. """ return service.stat_counters() @app.route('/api/1/origin//visits/') @doc.route('/api/1/origin/visits/') @doc.arg('origin_id', default=1, argtype=doc.argtypes.int, argdoc='The requested SWH origin identifier') @doc.returns(rettype=doc.rettypes.list, retdoc="""All instances of visits of the origin pointed by origin_id as POSIX time since epoch (if visit_id is not defined) """) def api_origin_visits(origin_id): """Return a list of origin visit (dict) for that particular origin including date (visit date as posix timestamp), target, target_type, status, ... """ def _enrich_origin_visit(origin_visit): ov = origin_visit.copy() ov['origin_visit_url'] = url_for('api_origin_visit', origin_id=ov['origin'], visit_id=ov['visit']) return ov return _api_lookup( origin_id, service.lookup_origin_visits, error_msg_if_not_found='No origin %s found' % origin_id, enrich_fn=_enrich_origin_visit) @app.route('/api/1/origin//visit//') @doc.route('/api/1/origin/visit/') @doc.arg('origin_id', default=1, argtype=doc.argtypes.int, argdoc='The requested SWH origin identifier') @doc.arg('visit_id', default=1, argtype=doc.argtypes.int, argdoc='The requested SWH origin visit identifier') @doc.raises(exc=doc.excs.notfound, doc='Raised if no visit that match the query is found') @doc.returns(rettype=doc.rettypes.list, retdoc="""The single instance visit visit_id of the origin pointed by origin_id as POSIX time since epoch""") def api_origin_visit(origin_id, visit_id): """Return origin visit (dict) for that particular origin including (but not limited to) date (visit date as posix timestamp), target, target_type, status, ... """ def _enrich_origin_visit(origin_visit): ov = origin_visit.copy() ov['origin_url'] = url_for('api_origin', origin_id=ov['origin']) if 'occurrences' in ov: ov['occurrences'] = { k: utils.enrich_object(v) for k, v in ov['occurrences'].items() } return ov return _api_lookup( origin_id, service.lookup_origin_visit, 'No visit %s for origin %s found' % (visit_id, origin_id), _enrich_origin_visit, visit_id) @app.route('/api/1/content/symbol/', methods=['POST']) @app.route('/api/1/content/symbol//') @doc.route('/api/1/content/symbol/', tags=['upcoming']) @doc.arg('q', default='hello', argtype=doc.argtypes.str, argdoc="""An expression string to lookup in swh's raw content""") -@doc.header('Link', doc="""Optional 'Link' header proposed to the api consumer - for navigation purpose. possible are 'next' - or 'previous' page.""") +@doc.header('Link', + doc="""Optional 'Link' header proposed to the api consumer + for navigation purpose. possible are 'next' + or 'previous' page.""") +@doc.param('per_page', default=10, + doc="""Optional parameter which permits + to limit the number of data to retrieve on a per request + basis. + The default is 10, up to 50 max.""") @doc.returns(rettype=doc.rettypes.list, retdoc="""A list of dict whose content matches the expression. Each dict has the following keys: - id (bytes): identifier of the content - name (text): symbol whose content match the expression - kind (text): kind of the symbol that matched - lang (text): Language for that entry - line (int): Number line for the symbol The result is paginated by page of 10 results. The 'Link' header gives the relation to follow for the next and eventually the previous page. """) def api_content_symbol(q=None): """Search symbol in indexed content's data. The result is paginated and and Link header will give the next and previous link to have the next result. """ result = {} last_sha1 = request.args.get('last_sha1', None) per_page = int(request.args.get('per_page', '10')) def lookup_exp(exp, last_sha1=last_sha1, per_page=per_page): return service.lookup_expression(exp, last_sha1, per_page) symbols = _api_lookup( q, lookup_fn=lookup_exp, error_msg_if_not_found='No indexed raw content match expression \'' '%s\'.' % q, enrich_fn=lambda x: utils.enrich_content(x, top_url=True)) if symbols: l = len(symbols) url = url_for('api_content_symbol', q=q) headers = {} if l == per_page: new_last_sha1 = symbols[-1]['sha1'] params = [('last_sha1', new_last_sha1)] nb_per_page = request.args.get('per_page') if nb_per_page: params.append(('per_page', nb_per_page)) headers['link-next'] = utils.to_url(url, params) result['headers'] = headers result.update({ 'results': symbols }) return result @app.route('/api/1/content/known/', methods=['POST']) @app.route('/api/1/content/known//') @doc.route('/api/1/content/known/') @doc.arg('q', default='adc83b19e793491b1c6ea0fd8b46cd9f32e592fc', argtype=doc.argtypes.algo_and_hash, argdoc="""An algo_hash:hash string, where algo_hash is one of sha1, sha1_git or sha256 and hash is the hash to search for in SWH""") @doc.raises(exc=doc.excs.badinput, doc='Raised if q is not well formed') @doc.returns(rettype=doc.rettypes.dict, retdoc="""A dict with keys: - search_res: a list of dicts corresponding to queried content with key 'found' to True if found, 'False' if not - search_stats: a dict containing number of files searched and percentage of files found """) def api_check_content_known(q=None): """Search a content per hash. This may take the form of: - a GET request with many hashes (limited to the size of parameter we can pass in url) a POST request with many hashes, with the - request body containing identifiers (typically filenames) as - keys and corresponding hashes as values. """ response = {'search_res': None, 'search_stats': None} search_stats = {'nbfiles': 0, 'pct': 0} search_res = None queries = [] # GET: Many hash separated values request if q: hashes = q.split(',') for v in hashes: queries.append({'filename': None, 'sha1': v}) # POST: Many hash requests in post form submission elif request.method == 'POST': data = request.form # Remove potential inputs with no associated value for k, v in data.items(): if v is not None: if k == 'q' and len(v) > 0: queries.append({'filename': None, 'sha1': v}) elif v != '': queries.append({'filename': k, 'sha1': v}) if queries: lookup = service.lookup_multiple_hashes(queries) result = [] l = len(queries) for el in lookup: result.append({'filename': el['filename'], 'sha1': el['sha1'], 'found': el['found']}) search_res = result nbfound = len([x for x in lookup if x['found']]) search_stats['nbfiles'] = l search_stats['pct'] = (nbfound / l) * 100 response['search_res'] = search_res response['search_stats'] = search_stats return response def _api_lookup(criteria, lookup_fn, error_msg_if_not_found, enrich_fn=lambda x: x, *args): """Capture a redundant behavior of: - looking up the backend with a criteria (be it an identifier or checksum) passed to the function lookup_fn - if nothing is found, raise an NotFoundExc exception with error message error_msg_if_not_found. - Otherwise if something is returned: - either as list, map or generator, map the enrich_fn function to it and return the resulting data structure as list. - either as dict and pass to enrich_fn and return the dict enriched. Args: - criteria: discriminating criteria to lookup - lookup_fn: function expects one criteria and optional supplementary *args. - error_msg_if_not_found: if nothing matching the criteria is found, raise NotFoundExc with this error message. - enrich_fn: Function to use to enrich the result returned by lookup_fn. Default to the identity function if not provided. - *args: supplementary arguments to pass to lookup_fn. Raises: NotFoundExp or whatever `lookup_fn` raises. """ res = lookup_fn(criteria, *args) if not res: raise NotFoundExc(error_msg_if_not_found) if isinstance(res, (map, list, GeneratorType)): return [enrich_fn(x) for x in res] return enrich_fn(res) @app.route('/api/1/origin//') @app.route('/api/1/origin//url//') @doc.route('/api/1/origin/') @doc.arg('origin_id', default=1, argtype=doc.argtypes.int, argdoc="The origin's SWH origin_id.") @doc.arg('origin_type', default='git', argtype=doc.argtypes.str, argdoc="The origin's type (git, svn..)") @doc.arg('origin_url', default='https://github.com/hylang/hy', argtype=doc.argtypes.path, argdoc="The origin's URL.") @doc.raises(exc=doc.excs.notfound, doc='Raised if origin_id does not correspond to an origin in SWH') @doc.returns(rettype=doc.rettypes.dict, retdoc='The metadata of the origin identified by origin_id') def api_origin(origin_id=None, origin_type=None, origin_url=None): """Return information about the origin matching the passed criteria. Criteria may be: - An SWH-specific ID, if you already know it - An origin type and its URL, if you do not have the origin's SWH identifier """ ori_dict = { 'id': origin_id, 'type': origin_type, 'url': origin_url } ori_dict = {k: v for k, v in ori_dict.items() if ori_dict[k]} if 'id' in ori_dict: error_msg = 'Origin with id %s not found.' % ori_dict['id'] else: error_msg = 'Origin with type %s and URL %s not found' % ( ori_dict['type'], ori_dict['url']) def _enrich_origin(origin): if 'id' in origin: o = origin.copy() o['origin_visits_url'] = url_for('api_origin_visits', origin_id=o['id']) return o return origin return _api_lookup( ori_dict, lookup_fn=service.lookup_origin, error_msg_if_not_found=error_msg, enrich_fn=_enrich_origin) @app.route('/api/1/person//') @doc.route('/api/1/person/') @doc.arg('person_id', default=1, argtype=doc.argtypes.int, argdoc="The person's SWH identifier") @doc.raises(exc=doc.excs.notfound, doc='Raised if person_id does not correspond to an origin in SWH') @doc.returns(rettype=doc.rettypes.dict, retdoc='The metadata of the person identified by person_id') def api_person(person_id): """Return information about person with identifier person_id. """ return _api_lookup( person_id, lookup_fn=service.lookup_person, error_msg_if_not_found='Person with id %s not found.' % person_id) @app.route('/api/1/release//') @doc.route('/api/1/release/') @doc.arg('sha1_git', default='97d8dcd0c589b1d94a5d26cf0c1e8f2f44b92bfd', argtype=doc.argtypes.sha1_git, argdoc="The release's sha1_git identifier") @doc.raises(exc=doc.excs.badinput, doc='Raised if the argument is not a sha1') @doc.raises(exc=doc.excs.notfound, doc='Raised if sha1_git does not correspond to a release in SWH') @doc.returns(rettype=doc.rettypes.dict, retdoc='The metadata of the release identified by sha1_git') def api_release(sha1_git): """Return information about release with id sha1_git. """ error_msg = 'Release with sha1_git %s not found.' % sha1_git return _api_lookup( sha1_git, lookup_fn=service.lookup_release, error_msg_if_not_found=error_msg, enrich_fn=utils.enrich_release) def _revision_directory_by(revision, path, request_path, limit=100, with_data=False): """Compute the revision matching criterion's directory or content data. Args: revision: dictionary of criterions representing a revision to lookup path: directory's path to lookup request_path: request path which holds the original context to limit: optional query parameter to limit the revisions log (default to 100). For now, note that this limit could impede the transitivity conclusion about sha1_git not being an ancestor of with_data: indicate to retrieve the content's raw data if path resolves to a content. """ def enrich_directory_local(dir, context_url=request_path): return utils.enrich_directory(dir, context_url) rev_id, result = service.lookup_directory_through_revision( revision, path, limit=limit, with_data=with_data) content = result['content'] if result['type'] == 'dir': # dir_entries result['content'] = list(map(enrich_directory_local, content)) else: # content result['content'] = utils.enrich_content(content) return result @app.route('/api/1/revision' '/origin/' '/directory/') @app.route('/api/1/revision' '/origin/' '/directory//') @app.route('/api/1/revision' '/origin/' '/branch/' '/directory/') @app.route('/api/1/revision' '/origin/' '/branch/' '/directory//') @app.route('/api/1/revision' '/origin/' '/branch/' '/ts/' '/directory/') @app.route('/api/1/revision' '/origin/' '/branch/' '/ts/' '/directory//') @doc.route('/api/1/revision/origin/directory/') @doc.arg('origin_id', default=1, argtype=doc.argtypes.int, argdoc="The revision's origin's SWH identifier") @doc.arg('branch_name', default='refs/heads/master', argtype=doc.argtypes.path, argdoc="""The optional branch for the given origin (default to master""") @doc.arg('ts', default='2000-01-17T11:23:54+00:00', argtype=doc.argtypes.ts, argdoc="""Optional timestamp (default to the nearest time crawl of timestamp)""") @doc.arg('path', default='Dockerfile', argtype=doc.argtypes.path, argdoc='The path to the directory or file to display') @doc.raises(exc=doc.excs.notfound, doc="""Raised if a revision matching the passed criteria was not found""") @doc.returns(rettype=doc.rettypes.dict, retdoc="""The metadata of the revision corresponding to the passed criteria""") def api_directory_through_revision_origin(origin_id, branch_name="refs/heads/master", ts=None, path=None, with_data=False): """Display directory or content information through a revision identified by origin/branch/timestamp. """ if ts: ts = utils.parse_timestamp(ts) return _revision_directory_by( { 'origin_id': origin_id, 'branch_name': branch_name, 'ts': ts }, path, request.path, with_data=with_data) @app.route('/api/1/revision' '/origin//') @app.route('/api/1/revision' '/origin/' '/branch//') @app.route('/api/1/revision' '/origin/' '/branch/' '/ts//') @app.route('/api/1/revision' '/origin/' '/ts//') @doc.route('/api/1/revision/origin/') @doc.arg('origin_id', default=1, argtype=doc.argtypes.int, argdoc="The queried revision's origin identifier in SWH") @doc.arg('branch_name', default='refs/heads/master', argtype=doc.argtypes.path, argdoc="""The optional branch for the given origin (default to master)""") @doc.arg('ts', default='2000-01-17T11:23:54+00:00', argtype=doc.argtypes.ts, argdoc="The time at which the queried revision should be constrained") @doc.raises(exc=doc.excs.notfound, doc="""Raised if a revision matching given criteria was not found in SWH""") @doc.returns(rettype=doc.rettypes.dict, retdoc="""The metadata of the revision identified by the given criteria""") def api_revision_with_origin(origin_id, branch_name="refs/heads/master", ts=None): """Display revision information through its identification by origin/branch/timestamp. """ if ts: ts = utils.parse_timestamp(ts) return _api_lookup( origin_id, service.lookup_revision_by, 'Revision with (origin_id: %s, branch_name: %s' ', ts: %s) not found.' % (origin_id, branch_name, ts), utils.enrich_revision, branch_name, ts) @app.route('/api/1/revision//') @app.route('/api/1/revision//prev//') @doc.route('/api/1/revision/') @doc.arg('sha1_git', default='ec72c666fb345ea5f21359b7bc063710ce558e39', argtype=doc.argtypes.sha1_git, argdoc="The revision's sha1_git identifier") @doc.arg('context', default='6adc4a22f20bbf3bbc754f1ec8c82be5dfb5c71a', argtype=doc.argtypes.path, argdoc='The navigation breadcrumbs -- use at your own risk') @doc.raises(exc=doc.excs.badinput, doc='Raised if sha1_git is not well formed') @doc.raises(exc=doc.excs.notfound, doc='Raised if a revision matching sha1_git was not found in SWH') @doc.returns(rettype=doc.rettypes.dict, retdoc='The metadata of the revision identified by sha1_git') def api_revision(sha1_git, context=None): """Return information about revision with id sha1_git. """ def _enrich_revision(revision, context=context): return utils.enrich_revision(revision, context) return _api_lookup( sha1_git, service.lookup_revision, 'Revision with sha1_git %s not found.' % sha1_git, _enrich_revision) @app.route('/api/1/revision//raw/') @doc.route('/api/1/revision/raw/') @doc.arg('sha1_git', default='ec72c666fb345ea5f21359b7bc063710ce558e39', argtype=doc.argtypes.sha1_git, argdoc="The queried revision's sha1_git identifier") @doc.raises(exc=doc.excs.badinput, doc='Raised if sha1_git is not well formed') @doc.raises(exc=doc.excs.notfound, doc='Raised if a revision matching sha1_git was not found in SWH') @doc.returns(rettype=doc.rettypes.octet_stream, retdoc="""The message of the revision identified by sha1_git as a downloadable octet stream""") def api_revision_raw_message(sha1_git): """Return the raw data of the message of revision identified by sha1_git """ raw = service.lookup_revision_message(sha1_git) return app.response_class(raw['message'], headers={'Content-disposition': 'attachment;' 'filename=rev_%s_raw' % sha1_git}, mimetype='application/octet-stream') @app.route('/api/1/revision//directory/') @app.route('/api/1/revision//directory//') @doc.route('/api/1/revision/directory/') @doc.arg('sha1_git', default='ec72c666fb345ea5f21359b7bc063710ce558e39', argtype=doc.argtypes.sha1_git, argdoc="The revision's sha1_git identifier.") @doc.arg('dir_path', default='Documentation/BUG-HUNTING', argtype=doc.argtypes.path, argdoc='The path from the top level directory') @doc.raises(exc=doc.excs.badinput, doc='Raised if sha1_git is not well formed') @doc.raises(exc=doc.excs.notfound, doc="""Raised if a revision matching sha1_git was not found in SWH , or if the path specified does not exist""") @doc.returns(rettype=doc.rettypes.dict, retdoc="""The metadata of the directory pointed by revision id sha1-git and dir_path""") def api_revision_directory(sha1_git, dir_path=None, with_data=False): """Return information on directory pointed by revision with sha1_git. If dir_path is not provided, display top level directory. Otherwise, display the directory pointed by dir_path (if it exists). """ return _revision_directory_by( { 'sha1_git': sha1_git }, dir_path, request.path, with_data=with_data) @app.route('/api/1/revision//log/') @app.route('/api/1/revision//prev//log/') @doc.route('/api/1/revision/log/') @doc.arg('sha1_git', default='ec72c666fb345ea5f21359b7bc063710ce558e39', argtype=doc.argtypes.sha1_git, argdoc="The revision's identifier queried") @doc.arg('prev_sha1s', default='6adc4a22f20bbf3bbc754f1ec8c82be5dfb5c71a', argtype=doc.argtypes.path, argdoc="""(Optional) Navigation breadcrumbs (descendant revisions previously visited). If multiple values, use / as delimiter. """) @doc.raises(exc=doc.excs.badinput, doc='Raised if sha1_git or prev_sha1s is not well formed') @doc.raises(exc=doc.excs.notfound, doc='Raised if a revision matching sha1_git was not found in SWH') @doc.returns(rettype=doc.rettypes.dict, retdoc="""The log data starting at the revision identified by sha1_git, completed with the navigation breadcrumbs, if any""") def api_revision_log(sha1_git, prev_sha1s=None): """Show all revisions (~git log) starting from sha1_git. The first element returned is the given sha1_git, or the first breadcrumb, if any. The result is paginated. To browse for the following revisions, use the link mentioned in the 'next_revs_url' key. """ limit = app.config['conf']['max_log_revs'] response = {'revisions': None, 'next_revs_url': None} revisions = None next_revs_url = None def lookup_revision_log_with_limit(s, limit=limit+1): return service.lookup_revision_log(s, limit) error_msg = 'Revision with sha1_git %s not found.' % sha1_git rev_get = _api_lookup(sha1_git, lookup_fn=lookup_revision_log_with_limit, error_msg_if_not_found=error_msg, enrich_fn=utils.enrich_revision) if len(rev_get) == limit+1: rev_backward = rev_get[:-1] next_revs_url = url_for('api_revision_log', sha1_git=rev_get[-1]['id']) else: rev_backward = rev_get if not prev_sha1s: # no nav breadcrumbs, so we're done revisions = rev_backward else: rev_forward_ids = prev_sha1s.split('/') rev_forward = _api_lookup(rev_forward_ids, lookup_fn=service.lookup_revision_multiple, error_msg_if_not_found=error_msg, enrich_fn=utils.enrich_revision) revisions = rev_forward + rev_backward response['revisions'] = revisions response['next_revs_url'] = next_revs_url return response @app.route('/api/1/revision' '/origin//log/') @app.route('/api/1/revision' '/origin/' '/branch//log/') @app.route('/api/1/revision' '/origin/' '/branch/' '/ts//log/') @app.route('/api/1/revision' '/origin/' '/ts//log/') @doc.route('/api/1/revision/origin/log/') @doc.arg('origin_id', default=1, argtype=doc.argtypes.int, argdoc="The revision's SWH origin identifier") @doc.arg('branch_name', default='refs/heads/master', argtype=doc.argtypes.path, argdoc="""(Optional) The revision's branch name within the origin specified. Defaults to 'refs/heads/master'.""") @doc.arg('ts', default='2000-01-17T11:23:54+00:00', argtype=doc.argtypes.ts, argdoc="""(Optional) A time or timestamp string to parse""") @doc.raises(exc=doc.excs.notfound, doc="""Raised if a revision matching the given criteria was not found in SWH""") @doc.returns(rettype=doc.rettypes.dict, retdoc="""The metadata of the revision log starting at the revision matching the given criteria.""") def api_revision_log_by(origin_id, branch_name='refs/heads/master', ts=None): """Show all revisions (~git log) starting from the revision targeted by the origin_id provided and optionally a branch name or/and a timestamp. The result is paginated. To browse the following revisions, use the link mentioned in the 'next_revs_url' key. """ limit = app.config['conf']['max_log_revs'] response = {'revisions': None, 'next_revs_url': None} next_revs_url = None if ts: ts = utils.parse_timestamp(ts) def lookup_revision_log_by_with_limit(o_id, br, ts, limit=limit+1): return service.lookup_revision_log_by(o_id, br, ts, limit) error_msg = 'No revision matching origin %s ' % origin_id error_msg += ', branch name %s' % branch_name error_msg += (' and time stamp %s.' % ts) if ts else '.' rev_get = _api_lookup(origin_id, lookup_revision_log_by_with_limit, error_msg, utils.enrich_revision, branch_name, ts) if len(rev_get) == limit+1: revisions = rev_get[:-1] next_revs_url = url_for('api_revision_log', sha1_git=rev_get[-1]['id']) else: revisions = rev_get response['revisions'] = revisions response['next_revs_url'] = next_revs_url return response @app.route('/api/1/directory//') @app.route('/api/1/directory///') @doc.route('/api/1/directory/') @doc.arg('sha1_git', default='1bd0e65f7d2ff14ae994de17a1e7fe65111dcad8', argtype=doc.argtypes.sha1_git, argdoc="The queried directory's corresponding sha1_git hash") @doc.arg('path', default='codec/demux', argtype=doc.argtypes.path, argdoc="A path relative to the queried directory's top level") @doc.raises(exc=doc.excs.badinput, doc='Raised if sha1_git is not well formed') @doc.raises(exc=doc.excs.notfound, doc='Raised if a directory matching sha1_git was not found in SWH') @doc.returns(rettype=doc.rettypes.dict, retdoc="""The metadata and contents of the directory identified by sha1_git""") def api_directory(sha1_git, path=None): """Return information about directory with id sha1_git. """ if path: error_msg_path = ('Entry with path %s relative to directory ' 'with sha1_git %s not found.') % (path, sha1_git) return _api_lookup( sha1_git, service.lookup_directory_with_path, error_msg_path, utils.enrich_directory, path) else: error_msg_nopath = 'Directory with sha1_git %s not found.' % sha1_git return _api_lookup( sha1_git, service.lookup_directory, error_msg_nopath, utils.enrich_directory) @app.route('/api/1/provenance//') @doc.route('/api/1/provenance/', tags=['hidden']) @doc.arg('q', default='sha1_git:88b9b366facda0b5ff8d8640ee9279bed346f242', argtype=doc.argtypes.algo_and_hash, argdoc="""The queried content's corresponding hash (supported hash algorithms: sha1_git, sha1, sha256)""") @doc.raises(exc=doc.excs.badinput, doc="""Raised if hash algorithm is incorrect or if the hash value is badly formatted.""") @doc.raises(exc=doc.excs.notfound, doc="""Raised if a content matching the hash was not found in SWH""") @doc.returns(rettype=doc.rettypes.dict, retdoc="""List of provenance information (dict) for the matched content.""") def api_content_provenance(q): """Return content's provenance information if any. """ def _enrich_revision(provenance): p = provenance.copy() p['revision_url'] = url_for('api_revision', sha1_git=provenance['revision']) p['content_url'] = url_for('api_content_metadata', q='sha1_git:%s' % provenance['content']) p['origin_url'] = url_for('api_origin', origin_id=provenance['origin']) p['origin_visits_url'] = url_for('api_origin_visits', origin_id=provenance['origin']) p['origin_visit_url'] = url_for('api_origin_visit', origin_id=provenance['origin'], visit_id=provenance['visit']) return p return _api_lookup( q, lookup_fn=service.lookup_content_provenance, error_msg_if_not_found='Content with %s not found.' % q, enrich_fn=_enrich_revision) @app.route('/api/1/filetype//') @doc.route('/api/1/filetype/', tags=['upcoming']) @doc.arg('q', default='sha1:1fc6129a692e7a87b5450e2ba56e7669d0c5775d', argtype=doc.argtypes.algo_and_hash, argdoc="""The queried content's corresponding hash (supported hash algorithms: sha1_git, sha1, sha256)""") @doc.raises(exc=doc.excs.badinput, doc="""Raised if hash algorithm is incorrect or if the hash value is badly formatted.""") @doc.raises(exc=doc.excs.notfound, doc="""Raised if a content matching the hash was not found in SWH""") @doc.returns(rettype=doc.rettypes.dict, retdoc="""Filetype information (dict) for the matched content.""") def api_content_filetype(q): """Return content's filetype information if any. """ return _api_lookup( q, lookup_fn=service.lookup_content_filetype, error_msg_if_not_found='No filetype information found ' 'for content %s.' % q, enrich_fn=utils.enrich_metadata_endpoint) @app.route('/api/1/language//') @doc.route('/api/1/language/', tags=['upcoming']) @doc.arg('q', default='sha1:1fc6129a692e7a87b5450e2ba56e7669d0c5775d', argtype=doc.argtypes.algo_and_hash, argdoc="""The queried content's corresponding hash (supported hash algorithms: sha1_git, sha1, sha256)""") @doc.raises(exc=doc.excs.badinput, doc="""Raised if hash algorithm is incorrect or if the hash value is badly formatted.""") @doc.raises(exc=doc.excs.notfound, doc="""Raised if a content matching the hash was not found in SWH""") @doc.returns(rettype=doc.rettypes.dict, retdoc="""Language information (dict) for the matched content.""") def api_content_language(q): """Return content's language information if any. """ return _api_lookup( q, lookup_fn=service.lookup_content_language, error_msg_if_not_found='No language information found ' 'for content %s.' % q, enrich_fn=utils.enrich_metadata_endpoint) @app.route('/api/1/license//') @doc.route('/api/1/license/', tags=['upcoming']) @doc.arg('q', default='sha1:1fc6129a692e7a87b5450e2ba56e7669d0c5775d', argtype=doc.argtypes.algo_and_hash, argdoc="""The queried content's corresponding hash (supported hash algorithms: sha1_git, sha1, sha256)""") @doc.raises(exc=doc.excs.badinput, doc="""Raised if hash algorithm is incorrect or if the hash value is badly formatted.""") @doc.raises(exc=doc.excs.notfound, doc="""Raised if a content matching the hash was not found in SWH""") @doc.returns(rettype=doc.rettypes.dict, retdoc="""License information (dict) for the matched content.""") def api_content_license(q): """Return content's license information if any. """ return _api_lookup( q, lookup_fn=service.lookup_content_license, error_msg_if_not_found='No license information found ' 'for content %s.' % q, enrich_fn=utils.enrich_metadata_endpoint) @app.route('/api/1/ctags//') @doc.route('/api/1/ctags/', tags=['upcoming']) @doc.arg('q', default='sha1:1fc6129a692e7a87b5450e2ba56e7669d0c5775d', argtype=doc.argtypes.algo_and_hash, argdoc="""The queried content's corresponding hash (supported hash algorithms: sha1_git, sha1, sha256)""") @doc.raises(exc=doc.excs.badinput, doc="""Raised if hash algorithm is incorrect or if the hash value is badly formatted.""") @doc.raises(exc=doc.excs.notfound, doc="""Raised if a content matching the hash was not found in SWH""") @doc.returns(rettype=doc.rettypes.dict, retdoc="""Ctags symbol (dict) for the matched content.""") def api_content_ctags(q): """Return content's ctags symbols if any. """ return _api_lookup( q, lookup_fn=service.lookup_content_ctags, error_msg_if_not_found='No ctags symbol found ' 'for content %s.' % q, enrich_fn=utils.enrich_metadata_endpoint) @app.route('/api/1/content//raw/') @doc.route('/api/1/content/raw/', tags=['hidden']) @doc.arg('q', default='adc83b19e793491b1c6ea0fd8b46cd9f32e592fc', argtype=doc.argtypes.algo_and_hash, argdoc="""An algo_hash:hash string, where algo_hash is one of sha1, sha1_git or sha256 and hash is the hash to search for in SWH. Defaults to sha1 in the case of a missing algo_hash """) @doc.raises(exc=doc.excs.badinput, doc='Raised if q is not well formed') @doc.raises(exc=doc.excs.notfound, doc='Raised if a content matching q was not found in SWH') @doc.returns(rettype=doc.rettypes.octet_stream, retdoc='The raw content data as an octet stream') def api_content_raw(q): """Return content's raw data if content is found. """ def generate(content): yield content['data'] content = service.lookup_content_raw(q) if not content: raise NotFoundExc('Content with %s not found.' % q) return app.response_class(generate(content), headers={'Content-disposition': 'attachment;' 'filename=content_%s_raw' % q}, mimetype='application/octet-stream') @app.route('/api/1/content//') @doc.route('/api/1/content/') @doc.arg('q', default='adc83b19e793491b1c6ea0fd8b46cd9f32e592fc', argtype=doc.argtypes.algo_and_hash, argdoc="""An algo_hash:hash string, where algo_hash is one of sha1, sha1_git or sha256 and hash is the hash to search for in SWH. Defaults to sha1 in the case of a missing algo_hash """) @doc.raises(exc=doc.excs.badinput, doc='Raised if q is not well formed') @doc.raises(exc=doc.excs.notfound, doc='Raised if a content matching q was not found in SWH') @doc.returns(rettype=doc.rettypes.dict, retdoc="""The metadata of the content identified by q. If content decoding was successful, it also returns the data""") def api_content_metadata(q): """Return content information if content is found. """ return _api_lookup( q, lookup_fn=service.lookup_content, error_msg_if_not_found='Content with %s not found.' % q, enrich_fn=utils.enrich_content) @app.route('/api/1/entity//') @doc.route('/api/1/entity/', tags=['hidden']) @doc.arg('uuid', default='5f4d4c51-498a-4e28-88b3-b3e4e8396cba', argtype=doc.argtypes.uuid, argdoc="The entity's uuid identifier") @doc.raises(exc=doc.excs.badinput, doc='Raised if uuid is not well formed') @doc.raises(exc=doc.excs.notfound, doc='Raised if an entity matching uuid was not found in SWH') @doc.returns(rettype=doc.rettypes.dict, retdoc='The metadata of the entity identified by uuid') def api_entity_by_uuid(uuid): """Return content information if content is found. """ return _api_lookup( uuid, lookup_fn=service.lookup_entity_by_uuid, error_msg_if_not_found="Entity with uuid '%s' not found." % uuid, enrich_fn=utils.enrich_entity)